#! /usr/bin/env python
#
# Main test driver.

from __future__ import print_function
import io
import os
import os.path
import sys
import shutil
import fnmatch
import optparse
import re
import tempfile
import subprocess
import copy
import glob
import fnmatch
import time
import multiprocessing
import multiprocessing.managers
import multiprocessing.sharedctypes
import xml.dom.minidom
import socket
import resource
import struct
import uuid
import tempfile
import threading
import signal
import atexit
import locale
from datetime import datetime
import platform as pform

try:
    import ConfigParser as configparser
except ImportError:
    import configparser

VERSION = "0.59-12" # Automatically filled in.

using_py3 = (sys.version_info[0] == 3)

Name = "btest"
Config = None

try:
    ConfigDefault = os.environ["BTEST_CFG"]
except KeyError:
    ConfigDefault = "btest.cfg"

def output(msg, nl=True, file=None):
    if not file:
        file = sys.stderr

    if nl:
        print(msg, file=file)
    else:
        print(msg, end=" ", file=file)

def warning(msg):
    print("warning: %s" % msg, file=sys.stderr)

def error(msg):
    print(msg, file=sys.stderr)
    sys.exit(1)

def mkdir(dir):
    if not os.path.exists(dir):
        try:
            os.makedirs(dir)
        except OSError as e:
            error("cannot create directory %s: %s" % (dir, e))

    else:
        if not os.path.isdir(dir):
            error("path %s exists but is not a directory" % dir)

def which(cmd):
    # Adapted from http://stackoverflow.com/a/377028
    def is_exe(fpath):
        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

    (fpath, fname) = os.path.split(cmd)

    if fpath:
        if is_exe(cmd):
            return cmd

    else:
        for path in os.environ["PATH"].split(os.pathsep):
            path = path.strip('"')
            exe_file = os.path.join(path, cmd)
            if is_exe(exe_file):
                return exe_file

    return None

def platform():
    return pform.system()

def getDefaultBtestEncoding():
    if locale.getdefaultlocale()[1] is None:
        return 'utf-8'

    return locale.getpreferredencoding()

# Get the value of the specified option in the specified section (or
# section "btest" if not specified), or return the specified default value
# if the option or section is not found.  The returned value has macros and
# backticks from the config file expanded, but if the default value is returned
# it will not be modified in any way.
def getOption(key, default, section="btest"):
    try:
        value = Config.get(section, key)
    except (configparser.NoSectionError, configparser.NoOptionError):
        return default

    return ExpandBackticks(value)

reBackticks = re.compile(r"`(([^`]|\`)*)`")

def readStateFile():
    try:
        # Read state file.
        tests = []

        for line in open(StateFile):
            line = line.strip()
            if not line or line.startswith("#"):
                continue

            tests += [line]

        tests = findTests(tests)

    except IOError:
        return (False, [])

    return (True, tests)

# Expand backticks in a config option value and return the result.
def ExpandBackticks(origvalue):
    def _exec(m):
        cmd = m.group(1)
        if not cmd:
            return ""

        try:
            pp = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
        except OSError as e:
            error("cannot execute '%s': %s" % (cmd, e))

        out = pp.communicate()[0]
        if using_py3:
            out = out.decode()

        return out.strip()

    value = reBackticks.sub(_exec, origvalue)

    return value

# We monkey-patch the config parser to provide an alternative method that
# expands backticks in option values and does not include defaults in
# returned section items.
def cpItemsNoDefaults(self, section):
    # Get the keys from the specified section without anything from the
    # default section (the values are raw, so we need to fetch the actual
    # value below).
    try:
        items = self._sections[section].items()
    except KeyError:
        raise configparser.NoSectionError(section)

    result = {}

    for (key, rawvalue) in items:
        # Python 2 includes a key of "__name__" that we don't want (Python 3
        # doesn't include this)
        if not key.startswith("__"):
            # Expand macros such as %(testbase)s.
            value = self.get(section, key)
            # Expand backticks (if any) in the value.
            result[key] = ExpandBackticks(value)

    return result.items()

# Replace environment variables in string.
def replaceEnvs(s):
    def replace_with_env(m):
        try:
            return os.environ[m.group(1)]
        except KeyError:
            return ""

    return RE_ENV.sub(replace_with_env, s)

# Execute one of test's command line *cmdline*. *measure_time* indicates if
# timing measurement is desired. *kw_args* are further keyword arguments
# interpreted the same way as with subprocess.check_call().
# Returns a 3-tuple (success, rc, time) where the former two likewise
# have the same meaning as with runSubprocess(), and 'time' is an integer
# value corresponding to the commands execution time measured in some
# appropiate integer measure. If 'time' is negative, that's an indicator
# that time measurement wasn't possible and the value is to be ignored.
def runTestCommandLine(cmdline, measure_time, **kwargs):
    if measure_time and Timer:
        return Timer.timeSubprocess(cmdline, **kwargs)
    else:
        (success, rc) = runSubprocess(cmdline, **kwargs)
        return (success, rc, -1)

# Runs a subprocess. Takes same arguments as subprocess.check_call()
# and returns a 2-tuple (success, rc) where *success* is a boolean
# indicating if the command executed, and *rc* is its exit code if it did.
def runSubprocess(*args, **kwargs):
    def child(q):
        try:
            subprocess.check_call(*args, **kwargs)
            success = True
            rc = 0

        except subprocess.CalledProcessError as e:
            success = False
            rc = e.returncode

        except KeyboardInterrupt:
            success = False
            rc = 0

        q.put([success, rc])

    try:
        q = multiprocessing.Queue()
        p = multiprocessing.Process(target=child, args=(q,))
        p.start()
        result = q.get()
        p.join()

    except KeyboardInterrupt:
        # Bailout here directly as otherwise we'll a bunch of errors
        # from all the childs.
        os._exit(1)

    return result

def getcfgparser(defaults):
    if using_py3:
        configparser.ConfigParser.itemsNoDefaults = cpItemsNoDefaults
        cfg = configparser.ConfigParser(defaults)
    else:
        # For Python 2, use SafeConfigParser because it behaves the same
        # as ConfigParser in Python 3.
        configparser.SafeConfigParser.itemsNoDefaults = cpItemsNoDefaults
        cfg = configparser.SafeConfigParser(defaults)

    return cfg

# Description of an alternative configuration.
class Alternative:
    def __init__(self, name):
        self.name = name
        self.filters = {}
        self.substitutions = {}
        self.envs = {}

# Main class distributing the work across threads.
class TestManager(multiprocessing.managers.SyncManager):
    def __init__(self, *args, **kwargs):
        super(TestManager, self).__init__(*args, **kwargs)

    def run(self, tests, output_handler):
        self.start()

        output_handler.prepare(self)
        self._output_handler = output_handler
        self._lock = self.RLock()
        self._succeeded = multiprocessing.sharedctypes.RawValue('i', 0)
        self._failed = multiprocessing.sharedctypes.RawValue('i', 0)
        self._unstable = multiprocessing.sharedctypes.RawValue('i', 0)
        self._skipped = multiprocessing.sharedctypes.RawValue('i', 0)
        self._tests = self.list(tests)
        self._failed_tests = self.list([])
        self._num_tests = len(self._tests)
        self._timing = self.loadTiming()

        port_range = getOption("PortRange", "1024-65535")
        port_range_lo = int(port_range.split("-")[0])
        port_range_hi = int(port_range.split("-")[1])

        if port_range_lo > port_range_hi:
            error("invalid PortRange value: {0}".format(port_range))

        max_test_ports = 0
        test_with_most_ports = None

        for t in self._tests:
            if len(t.ports) > max_test_ports:
                max_test_ports = len(t.ports)
                test_with_most_ports = t

        if max_test_ports > port_range_hi - port_range_lo + 1:
            error("PortRange {0} cannot satisfy requirement of {1} ports in test {2}".format(port_range, max_test_ports, test_with_most_ports.name))

        self._ports = self.list([p for p in range(port_range_lo, port_range_hi + 1)])

        num_threads = Options.threads

        if num_threads:
            threads = []

            for i in range(num_threads):
                t = multiprocessing.Process(name="#%d" % (i+1), target=lambda: self.threadRun(i))
                t.start()
                threads += [t]

            for t in threads:
                t.join()

        else:
            # No threads, just run all directly.
            self.threadRun(0)

        # Record failed tests if not updating.
        if Options.mode != "UPDATE" and Options.mode != "UPDATE_INTERACTIVE":
            try:
                state = open(StateFile, "w")
            except IOError as e:
                error("cannot open state file %s" % StateFile)

            for t in self._failed_tests:
                print(t, file=state)

            state.close()

        return (self._succeeded.value, self._failed.value, self._skipped.value, self._unstable.value)

    def percentage(self):
        if not self._num_tests:
            return 0

        count = self._succeeded.value + self._failed.value + self._skipped.value
        return 100.0 * count / self._num_tests

    def threadRun(self, thread_num):
        signal.signal(signal.SIGINT, signal.SIG_IGN)

        all_tests = []

        while True:
            tests = self.nextTests(thread_num)
            if tests is None:
                # No more work for us.
                return

            all_tests += tests

            for t in tests:
                t.run(self)
                self.testReplayOutput(t)

            if Options.update_times:
                self.saveTiming(all_tests)

    def rerun (self, test):
        test.reruns += 1
        self._tests += [test.clone()]

    def nextTests(self, thread_num):
        with self._lock:
            for i in range(len(self._tests)):
                t = self._tests[i]

                if not t:
                    continue

                if Options.threads and t.serialize:
                    if hash(t.serialize) % Options.threads != thread_num:
                        # Not ours.
                        continue

                # We'll execute it, delete from queue.
                del self._tests[i]

                if Options.alternatives:
                    tests = []

                    for alternative in Options.alternatives:

                        if alternative in t.ignore_alternatives:
                            continue

                        if t.include_alternatives and alternative not in t.include_alternatives:
                            continue

                        alternative_test = copy.deepcopy(t)

                        if alternative == "-":
                            alternative = ""

                        alternative_test.setAlternative(alternative)
                        tests += [alternative_test]

                else:
                    if t.include_alternatives and "default" not in t.include_alternatives:
                        tests = []

                    elif "default" in t.ignore_alternatives:
                        tests = []

                    else:
                        tests = [t]

                return tests

        # No more tests for us.
        return None

    def returnPorts(self, ports):
        with self._lock:
            for p in ports:
                self._ports.append(p)

    def getAvailablePorts(self, count):
        with self._lock:

            if count > len(self._ports):
                return []

            first_port = -1
            rval = []

            for i in range(count):
                while True:
                    if len(self._ports) == 0:
                        for s in rval:
                            s.close()
                            self._ports.append(s.getsockname()[1])
                        return []

                    next_port = self._ports[0]

                    if next_port == first_port:
                        # Looped over port pool once, bail out.
                        for s in rval:
                            s.close()
                            self._ports.append(s.getsockname()[1])

                        return []

                    if first_port == -1:
                        first_port = next_port

                    del self._ports[0]

                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

                    # Setting REUSEADDR would allow ports to be recycled
                    # more quickly, but on macOS, seems to also have the
                    # effect of allowing multiple sockets to bind to the
                    # same port, even if REUSEPORT is off, so just try to
                    # ensure both are off.
                    if hasattr(socket, 'SO_REUSEADDR'):
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 0)
                    if hasattr(socket, 'SO_REUSEPORT'):
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0)

                    try:
                        sock.bind(('', next_port))
                    except:
                        self._ports.append(next_port)
                        continue
                    else:
                        break

                rval.append(sock)

            return rval

    def lock(self):
        return self._lock

    def testStart(self, test):
        with self._lock:
            self._output_handler.testStart(test)

    def testCommand(self, test, cmdline):
        with self._lock:
            self._output_handler.testCommand(test, cmdline)

    def testProgress(self, test, msg):
        with self._lock:
            self._output_handler.testProgress(test, msg)

    def testSucceeded(self, test):
        test.parseProgress()

        msg = "ok"

        if test.known_failure:
            msg += " (but expected to fail)"

        msg += test.timePostfix()

        with self._lock:
            if test.reruns == 0:
                self._succeeded.value += 1
                self._output_handler.testSucceeded(test, msg)
            else:
                self._failed.value -= 1
                self._unstable.value += 1
                msg += " on retry, unstable"
                self._output_handler.testUnstable(test,msg)

            self._output_handler.testFinished(test, msg)

    def testFailed(self, test):
        test.parseProgress()

        msg = "failed"

        if test.reruns > 0:
            msg += " on retry"

        if test.known_failure:
            msg += " (expected)"

        msg += test.timePostfix()

        with self._lock:
            self._output_handler.testFailed(test, msg)
            self._output_handler.testFinished(test, msg)

            if test.reruns == 0:
                self._failed.value += 1

                if not test.known_failure:
                    self._failed_tests += [test.name]

            if test.reruns < Options.retries and not test.known_failure:
                self.rerun(test)

    def testSkipped(self, test):
        msg = "not available, skipped"

        with self._lock:
            self._output_handler.testSkipped(test, msg)
            self._skipped.value += 1

    def testReplayOutput(self, test):
        with self._lock:
            self._output_handler.replayOutput(test)

    def testTimingBaseline(self, test):
        return self._timing.get(test.name, -1)

    # Returns the name of the file to store the timing baseline in for this host.
    def timingPath(self):
        id = uuid.uuid3(uuid.NAMESPACE_DNS, str(uuid.getnode()))
        return os.path.abspath(os.path.join(BaselineTimingDir, id.hex))

    # Loads baseline timing information for this host if available. Returns
    # empty directory if not.
    def loadTiming(self):
        timing = {}

        with self._lock:
            path = self.timingPath()

            if not os.path.exists(path):
                return {}

            for line in open(path):
                (k, v) = line.split()
                timing[k] = float(v)

        return timing

    # Updates the timing baseline for the given tests on this host.
    def saveTiming(self, tests):
        with self._lock:
            changed = False
            timing = self.loadTiming()

            for t in tests:
                if t and t.measure_time and t.utime >= 0:
                    changed = True
                    timing[t.name] = t.utime

            if not changed:
                return

            path = self.timingPath()
            (dir, base) = os.path.split(path)
            mkdir(dir)

            out = open(path, "w")

            for (k, v) in timing.items():
                print("%s %u" % (k, v), file=out)

            out.close()

# One @TEST-{EXEC,REQUIRES} command line.
class CmdLine:
    def __init__(self, cmdline, expect_success, part, file):
        self.cmdline = cmdline
        self.expect_success = expect_success
        self.part = part
        self.file = file

# One test.
class Test(object):
    def __init__(self, file = None, directory = None): # Allow dir to be directly defined

        if file is not None: self.dir = os.path.abspath(os.path.dirname(file))
        else: self.dir = directory

        self.name = None
        self.basename = None
        self.part = -1
        self.number = 1
        self.serialize = []
        self.ports = set()
        self.groups = set()
        self.cmdlines = []
        self.tmpdir = None
        self.diag = None
        self.verbose = None
        self.baseline = None
        self.alternative = None
        self.ignore_alternatives = []
        self.include_alternatives = []
        self.files = []
        self.requires = []
        self.copy_files = []
        self.start = None
        self.contents = []
        self.cloned = False
        self.known_failure = False
        self.measure_time = False
        self.doc = []
        self.utime = -1
        self.utime_base = -1
        self.utime_perc = 0.0
        self.utime_exceeded = False
        self.monitor = None
        self.monitor_quit = None
        self.reruns = 0

    def displayName(self):
        name = self.name

        if self.alternative:
            name = "%s [%s]" % (name, self.alternative)

        return name

    def setAlternative(self, alternative):
        self.alternative = alternative

        # Parse the test's content.
    def parse(self, content, file):
        cmds = {}
        for line in content:

            m = RE_IGNORE.search(line)
            if m:
                # Ignore this file.
                return False

            for (tag, regexp, multiple, optional, group1, group2) in Commands:
                m = regexp.search(line)

                if m:
                    value = None

                    if group1 >= 0:
                        value = m.group(group1)

                    if group2 >= 0:
                        value = (value, m.group(group2))

                    if not multiple:
                        if tag in cmds:
                            error("%s: %s defined multiple times." % (file, tag))

                        cmds[tag] = value

                    else:
                        try:
                            cmds[tag] += [value]
                        except KeyError:
                            cmds[tag] = [value]

        # Make sure all non-optional commands are there.
        for (tag, regexp, multiple, optional, group1, group2) in Commands:
            if not optional and tag not in cmds:
                if tag == "exec":
                    error("%s: mandatory keyword '@TEST-EXEC' or '@TEST-EXEC-FAIL' is missing." % file)
                else:
                    error("%s: mandatory %s command not found." % (file, tag))

        basename = file

        part = 1
        m = RE_PART.match(file)

        if m:
            basename = m.group(1)
            part = int(m.group(2))

        name = os.path.relpath(basename, TestBase)
        (name, ext) = os.path.splitext(name)

        name = name.replace("/", ".")
        while name.startswith("."):
            name = name[1:]

        self.name = name
        self.part = part
        self.basename = name
        self.contents += [(file, content)]

        for (cmd, success) in cmds["exec"]:
            cmdline = CmdLine(cmd.strip(), success != "-FAIL", part, file)
            self.cmdlines += [cmdline]

        if PartFinalizer != "":
            finalizer = CmdLine("%s %s" % (PartFinalizer, self.name), True, part, "<PartFinalizer>")
            self.cmdlines += [finalizer]

        if "serialize" in cmds:
            self.serialize = cmds["serialize"]

        if "port" in cmds:
            self.ports |= set(cmd.strip() for cmd in cmds['port'])

        if "group" in cmds:
            self.groups |= set(cmd.strip() for cmd in cmds["group"])

        if "requires" in cmds:
            for cmd in cmds["requires"]:
                cmdline = CmdLine(cmd.strip(), True, part, file)
                self.requires += [cmdline]

        if "copy-file" in cmds:
            self.copy_files += [cmd.strip() for cmd in cmds["copy-file"]]

        if "alternative" in cmds:
            self.include_alternatives = [cmd.strip() for cmd in cmds["alternative"]]

        if "not-alternative" in cmds:
            self.ignore_alternatives = [cmd.strip() for cmd in cmds["not-alternative"]]

        if "known-failure" in cmds:
            self.known_failure = True

        if "measure-time" in cmds:
            self.measure_time = True

        if "doc" in cmds:
            self.doc = cmds["doc"]

        return True

    # Copies all control information over to a new Test but replacing the test's
    # content with a new one.
    def clone(self, content=None):
        clone = Test("")
        clone.number = self.number + 1
        clone.basename = self.basename
        clone.name = "%s-%d" % (self.basename, clone.number)
        clone.reruns = self.reruns
        clone.serialize = self.serialize
        clone.ports = self.ports
        clone.groups = self.groups
        clone.cmdlines = self.cmdlines
        clone.known_failure = self.known_failure
        clone.measure_time = self.measure_time
        clone.doc = self.doc

        if content:
            assert(len(self.contents) == 1)
            clone.contents = [(self.contents[0][0], content)]
        else:
            clone.contents = self.contents

        self.cloned = True

        return clone

    def mergePart(self, part):

        if self.cloned or part.cloned:
            error("cannot use @TEST-START-NEXT with tests split across parts (%s)" % self.basename)

        self.serialize += part.serialize
        self.ports |= part.ports
        self.groups |= part.groups
        self.cmdlines += part.cmdlines
        self.ignore_alternatives += part.ignore_alternatives
        self.include_alternatives += part.include_alternatives
        self.files += part.files
        self.requires += part.requires
        self.copy_files += part.copy_files
        self.contents += part.contents
        self.doc += part.doc
        self.known_failure |= part.known_failure
        self.measure_time |= part.measure_time

    def getPorts(self, mgr, count):
        if not count:
            return []

        attempts = 5

        while True:
            rval = mgr.getAvailablePorts(count)

            if rval:
                return rval

            attempts -= 1

            if attempts == 0:
                error("failed to obtain {0} ports for test {1}".format(
                    count, self.name))

            warning("failed to obtain {0} ports for test {1}, will try {2} more times".format(
                count, self.name, attempts))

            time.sleep(15)

    def run(self, mgr):
        bound_sockets = self.getPorts(mgr, len(self.ports))
        self.bound_ports = [s.getsockname()[1] for s in bound_sockets]

        for bs in bound_sockets:
            bs.close()

        self.start = time.time()
        self.mgr = mgr
        mgr.testStart(self)

        self.tmpdir = os.path.abspath(os.path.join(TmpDir, self.name))
        self.diag = os.path.join(self.tmpdir, ".diag")
        self.verbose = os.path.join(self.tmpdir, ".verbose")
        self.baseline = os.path.abspath(os.path.join(BaselineDir, self.name))
        self.diagmsgs = []
        self.utime = -1
        self.utime_base = self.mgr.testTimingBaseline(self)
        self.utime_perc = 0.0
        self.utime_exceeded = False

        self.rmTmp()
        mkdir(self.baseline)
        mkdir(self.tmpdir)

        for (fname, lines) in self.files:
            fname = os.path.join(self.tmpdir, fname)

            subdir = os.path.dirname(fname)

            if subdir != "":
                mkdir(subdir)
            try:
                ffile = open(fname, "w")
            except IOError as e:
                error("cannot write test's additional file '%s'" % fname)

            for line in lines:
                ffile.write(line)

            ffile.close()

        for file in self.copy_files:
            src = replaceEnvs(file)
            try:
                shutil.copy2(src, self.tmpdir)
            except IOError as e:
                error("cannot copy %s: %s" % (src, e))

        for (file, content) in self.contents:
            localfile = os.path.join(self.tmpdir, os.path.basename(file))
            out = io.open(localfile, "w", encoding=getDefaultBtestEncoding())

            try:
                for line in content:
                    out.write(line)
            except UnicodeEncodeError as e:
                error("unicode encode error in file %s: %s" % (localfile, e))

            out.close()

        self.log = open(os.path.join(self.tmpdir, ".log"), "w")
        self.stdout = open(os.path.join(self.tmpdir, ".stdout"), "w")
        self.stderr = open(os.path.join(self.tmpdir, ".stderr"), "w")

        for cmd in self.requires:
            (success, rc) = self.execute(cmd, apply_alternative=self.alternative)

            if not success:
                self.mgr.testSkipped(self)
                if not Options.tmps:
                    self.rmTmp()
                self.finish()
                return

        # Spawn thread that monitors for progress updates.
        # Note: We do indeed spawn a thread here, not a process, so
        # that the callback can modify the test object to maintain
        # state.
        def monitor_cb():
            while not self.monitor_quit.is_set():
                self.parseProgress()
                time.sleep(0.1)

        self.monitor = threading.Thread(target=monitor_cb)
        self.monitor_quit = threading.Event()
        self.monitor.start()

        # Run test's commands.

        failures = 0

        cmds = []

        if Initializer != "":
            initializer = CmdLine("%s %s" % (Initializer, self.name), True, 1, "<Initializer>")
            cmds += [initializer]

        cmds += self.cmdlines

        if Finalizer != "":
            finalizer = CmdLine("%s %s" % (Finalizer, self.name), True, 1, "<Finalizer>")
            cmds += [finalizer]

        skip_part = -1

        for cmd in cmds:
            if skip_part >= 0 and skip_part == cmd.part:
                continue

            (success, rc) = self.execute(cmd, apply_alternative=self.alternative)

            if not success:
                failures += 1

                if Options.sphinx:
                    # We still execute the remaining commands and
                    # raise a failure for each one that fails.
                    self.mgr.testFailed(self)
                    skip_part = cmd.part
                    continue

                if failures == 1:
                    self.mgr.testFailed(self)

                if rc == 200:
                    # Abort all tests.
                    self.monitor_quit.set()
                    sys.exit(1)

                if rc != 100:
                    break

        self.utime_perc = 0.0
        self.utime_exceeded = False

        if failures == 0:
            # If we don't have a timing baseline, we silently ignore that so that
            # on systems that can't measure execution time, the test will just pass.
            if self.utime_base >= 0 and self.utime >= 0:
                delta = getOption("TimingDeltaPerc", "1.0")
                self.utime_perc = (100.0 * (self.utime - self.utime_base) / self.utime_base)
                self.utime_exceeded = (abs(self.utime_perc) > float(delta))

            if self.utime_exceeded and not Options.update_times:
                self.diagmsgs += ["'%s' exceeded permitted execution time deviation%s" % (self.name, self.timePostfix())]
                self.mgr.testFailed(self)

            else:
                self.mgr.testSucceeded(self)

            if not Options.tmps and self.reruns == 0:
                self.rmTmp()

        self.finish()

    def finish(self):
        if self.bound_ports:
            self.mgr.returnPorts([p for p in self.bound_ports])

        self.bound_ports = []

        try:
            # Try removing the baseline directory. If it works, it's empty, i.e., no baseline was created.
            os.rmdir(self.baseline)
        except OSError:
            pass

        self.log.close()
        self.stdout.close()
        self.stderr.close()

        if self.monitor:
            self.monitor_quit.set()
            self.monitor.join()

    def execute(self, cmd, apply_alternative=None):
        filter_cmd = None
        addl_envs = {}

        cmdline = cmd.cmdline

        # Apply alternative if requested.
        if apply_alternative:

            alt = Alternatives[apply_alternative]

            try:
                (path, executable) = os.path.split(cmdline.split()[0])
                filter_cmd = alt.filters[executable]
            except LookupError:
                pass

            for (key, val) in alt.substitutions.items():
                cmdline = re.sub("\\b" + re.escape(key) + "\\b", val, cmdline)

            addl_envs = alt.envs

        localfile = os.path.join(self.tmpdir, os.path.basename(cmd.file))

        if filter_cmd and cmd.expect_success: # Do not apply filter if we expect failure.
            # This is not quite correct as it does not necessarily need to be
            # the %INPUT file which we are filtering ...
            filtered = os.path.join(self.tmpdir, "filtered-%s" % os.path.basename(localfile))

            filter = CmdLine("%s %s %s" % (filter_cmd, localfile, filtered), True, 1, "<Filter>")

            (success, rc) = self.execute(filter, apply_alternative=None)
            if not success:
                return (False, rc)

            mv = CmdLine("mv %s %s" % (filtered, localfile), True, 1, "<Filter-Move>")
            (success, rc) = self.execute(mv, apply_alternative=None)

            if not success:
                return (False, rc)

        self.mgr.testCommand(self, cmd)

        # Replace special names.

        if localfile:
            cmdline = RE_INPUT.sub(localfile, cmdline)

        cmdline = RE_DIR.sub(self.dir, cmdline)

        print("%s (expect %s)" % (cmdline, ("failure", "success")[cmd.expect_success]), file=self.log)

        env = self.prepareEnv(cmd, addl_envs)
        measure_time = self.measure_time and (Options.update_times or self.utime_base >= 0)

        (success, rc, utime) = runTestCommandLine(cmdline, measure_time, cwd=self.tmpdir, shell=True, env=env, stderr=self.stderr, stdout=self.stdout)

        if utime > 0:
            self.utime += utime

        if success:
            if cmd.expect_success:
                return (True, rc)

            self.diagmsgs += ["'%s' succeeded unexpectedly (exit code 0)" % cmdline]
            return (False, 0)

        else:
            if not cmd.expect_success:
                return (True, rc)

            self.diagmsgs += ["'%s' failed unexpectedly (exit code %s)" % (cmdline, rc)]
            return (False, rc)

    def rmTmp(self):
        try:
            if os.path.isfile(self.tmpdir):
                os.remove(self.tmpdir)

            if os.path.isdir(self.tmpdir):
                subprocess.call("rm -rf %s 2>/dev/null" % self.tmpdir, shell=True)

        except OSError as e:
            error("cannot remove tmp directory %s: %s" % (self.tmpdir, e))

    # Prepares the environment for the child processes.
    def prepareEnv(self, cmd, addl={}):
        env = copy.deepcopy(os.environ)

        env["TEST_BASELINE"] = self.baseline
        env["TEST_DIAGNOSTICS"] = self.diag
        env["TEST_MODE"] = Options.mode.upper()
        env["TEST_NAME"] = self.name
        env["TEST_VERBOSE"] = self.verbose
        env["TEST_PART"] = str(cmd.part)
        env["TEST_BASE"] = TestBase

        for (key, val) in addl.items():
            env[key.upper()] = val

        for idx, key in enumerate(sorted(self.ports)):
            env[key] = str(self.bound_ports[idx]) + "/tcp"

        return env

    def addFiles(self, files):
        # files is a list of tuple (fname, lines).
        self.files = files

    # If timing information is requested and available returns a
    # string that summarizes the time spent for the test.
    # Otherwise, returns an empty string.
    def timePostfix(self):
        if self.utime_base >= 0 and self.utime >= 0:
            return " (%+.1f%%)" % self.utime_perc
        else:
            return ""

    # Picks up any progress output that has a test has written out.
    def parseProgress(self):
        path = os.path.join(self.tmpdir, ".progress.*")
        for file in sorted(glob.glob(path)):
            try:
                for line in open(file):
                    msg = line.strip()
                    self.mgr.testProgress(self, msg)

                os.unlink(file)
            except (IOError, OSError):
                pass

### Output handlers.

class OutputHandler:
    def __init__(self, options):
        """Base class for reporting progress and results to user. We derive
        several classes from this one, with the one being used depending on
        which output the users wants.

        A handler's method are called from test TestMgr and may be called
        interleaved from different tests. However, the TestMgr locks before
        each call so that it's guaranteed that two calls don't run
        concurrently.

        options: An optparser with the global options.
        """
        self._buffered_output = {}
        self._options = options

    def prepare(self, mgr):
        """The TestManager calls this with itself as an argument just before
        it starts running tests."""
        pass

    def options(self):
        """Returns the current optparser instance."""
        return self._options

    def threadPrefix(self):
        """In threaded mode, returns a string with the thread's name in a form
        suitable to prefix output with. In non-threaded mode, returns the
        empty string."""
        if self.options().threads:
            return "[%s]" % multiprocessing.current_process().name
        else:
            return ""

    def _output(self, msg, nl=True, file=None):
        if not file:
            file = sys.stderr

        if nl:
            print(msg, file=file)
        else:
            if msg:
                print(msg, end=" ", file=file)

    def output(self, test, msg, nl=True, file=None):
        """Output one line of output to user. In non-threaded mode, this will
        be printed out directly to stderr. In threaded-mode, this will be
        buffered until the test has finished; then all output is printed as a
        block.

        This should only be called from other members of this class, or
        derived classes, not from tests.
        """
        if not self.options().threads:
            self._output(msg, nl, file)
            return

        else:
            try:
                self._buffered_output[test.name] += [(msg, nl, file)]
            except KeyError:
                self._buffered_output[test.name] = [(msg, nl, file)]

    def replayOutput(self, test):
        """Prints out all output buffered in threaded mode by output()."""
        if test.name not in self._buffered_output:
            return

        for (msg, nl, file) in self._buffered_output[test.name]:
            self._output(msg, nl, file)

        self._buffered_output[test.name] = []

    # Methods to override.
    def testStart(self, test):
        """Called just before a test begins."""
        pass

    def testCommand(self, test, cmdline):
        """Called just before a command line is exected for a trace."""
        pass

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        pass

    def testSucceeded(self, test, msg):
        """Called when a test was successful."""
        pass

    def testFailed(self, test, msg):
        """Called when a test failed."""
        pass

    def testSkipped(self, test, msg):
        """Called when a test is skipped because its dependencies aren't met."""
        pass

    def testFinished(self, test, msg):
        """
        Called just after a test has finished being processed, independent of
        success or failure. Not called for skipped tests.
        """
        pass

    def testUnstable(self,test,msg):
        """Called when a test failed initially but succeeded in a retry."""
        pass

    def finished(self):
        """Called when all tests have been executed."""
        pass

class Forwarder(OutputHandler):
    """
    Forwards output to several other handlers.

    options: An optparser with the global options.

    handlers: List of output handlers to forward to.
    """

    def __init__(self, options, handlers):
        OutputHandler.__init__(self, options)
        self._handlers = handlers

    def prepare(self, mgr):
        """Called just before test manager starts running tests."""
        for h in self._handlers:
            h.prepare(mgr)

    def testStart(self, test):
        """Called just before a test begins."""
        for h in self._handlers:
            h.testStart(test)

    def testCommand(self, test, cmdline):
        """Called just before a command line is exected for a trace."""
        for h in self._handlers:
            h.testCommand(test, cmdline)

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        for h in self._handlers:
            h.testProgress(test, msg)

    def testSucceeded(self, test, msg):
        """Called when a test was successful."""
        for h in self._handlers:
            h.testSucceeded(test, msg)

    def testFailed(self, test, msg):
        """Called when a test failed."""
        for h in self._handlers:
            h.testFailed(test, msg)

    def testUnstable(self, test, msg):
        """Called when a test failed initially but succeeded in a retry."""
        for h in self._handlers:
            h.testUnstable(test, msg)

    def testSkipped(self, test, msg):
        for h in self._handlers:
            h.testSkipped(test, msg)

    def replayOutput(self, test):
        for h in self._handlers:
            h.replayOutput(test)

    def finished(self):
        for h in self._handlers:
            h.finished()

class Standard(OutputHandler):
    def testStart(self, test):
        self.output(test, self.threadPrefix(), nl=False)
        self.output(test, "%s ..." % test.displayName(), nl=False)
        test._std_nl = False

    def testCommand(self, test, cmdline):
        pass

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        if not test._std_nl:
            self.output(test, "")

        self.output(test, "  - " + msg)
        test._std_nl = True

    def testSucceeded(self, test, msg):
        sys.stdout.flush()
        self.finalMsg(test, msg)

    def testFailed(self, test, msg):
        self.finalMsg(test, msg)

    def testSkipped(self, test, msg):
        self.finalMsg(test, msg)

    def finalMsg(self, test, msg):
        if test._std_nl:
            self.output(test, self.threadPrefix(), nl=False)
            self.output(test, "%s ..." % test.displayName(), nl=False)

        self.output(test, msg)

    def testUnstable(self,test,msg):
        self.finalMsg(test,msg)

class Console(OutputHandler):
    """
    Output handler that writes colorful progress report to the console.

    This handler works well in settings that can handle coloring but not
    cursor placement commands (for example because moving to the beginning of
    the line overwrites other surrounding output); it's what the
    ``--show-all`` output uses. In contrast, the *CompactConsole* handler uses
    cursor placement in addition for a more space-efficient output.
    """
    Green      = "\033[32m"
    Red        = "\033[31m"
    Yellow     = "\033[33m"
    Gray       = "\033[37m"
    DarkGray   = "\033[1;30m"
    Normal     = "\033[0m"

    def __init__(self, options):
        OutputHandler.__init__(self, options)
        self.show_all = True

    def testStart(self, test):
        msg = "[%3d%%] %s ..." % (test.mgr.percentage(), test.displayName())
        self._consoleOutput(test, msg, False)

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        msg = self.DarkGray + "(%s)" % msg + self.Normal
        self._consoleOutput(test, msg, True)

    def testSucceeded(self, test, msg):
        if test.known_failure:
            msg = self.Yellow + msg + self.Normal
        else:
            msg = self.Green + msg + self.Normal

        self._consoleOutput(test, msg, self.show_all)

    def testFailed(self, test, msg):
        if test.known_failure:
            msg = self.Yellow + msg + self.Normal
        else:
            msg = self.Red + msg + self.Normal

        self._consoleOutput(test, msg, True)

    def testUnstable (self, test, msg):
        msg = self.Yellow + msg + self.Normal
        self._consoleOutput(test, msg, True)

    def testSkipped(self, test, msg):
        msg = self.Gray + msg + self.Normal
        self._consoleOutput(test, msg, self.show_all)

    def finished(self):
        sys.stdout.flush()

    def _consoleOutput(self, test, msg, sticky):
        self._consoleWrite(test, msg, sticky)

    def _consoleWrite(self, test, msg, sticky):
        sys.stdout.write(msg.strip() + " ")

        if sticky:
            sys.stdout.write("\n")

        sys.stdout.flush()

class CompactConsole(Console):
    """
    Output handler that writes compact, colorful progress report to
    the console while also keeping the output compact by keeping
    output only for failing tests.

    This handler adds cursor mods and navigation to the coloring provided by
    the Console class and hence needs settings that can handle both.
    """
    CursorOff  = "\033[?25l"
    CursorOn   = "\033[?25h"
    EraseToEndOfLine = "\033[2K"

    def __init__(self, options):
        Console.__init__(self, options)
        self.show_all = False

        def cleanup():
            sys.stdout.write(self.CursorOn)

        atexit.register(cleanup)

    def testStart(self, test):
        test.console_last_line = None
        self._consoleOutput(test, "", False)
        sys.stdout.write(self.CursorOff)

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        msg = " " + self.DarkGray + "(%s)" % msg + self.Normal
        self._consoleAugment(test, msg)

    def testFinished(self, test):
        test.console_last_line = None

    def finished(self):
        sys.stdout.write(self.EraseToEndOfLine)
        sys.stdout.write("\r")
        sys.stdout.write(self.CursorOn)
        sys.stdout.flush()

    def _consoleOutput(self, test, msg, sticky):
        line = "[%3d%%] %s ..." % (test.mgr.percentage(), test.displayName())

        if msg:
            line += " " + msg

        test.console_last_line = line
        self._consoleWrite(test, line, sticky)

    def _consoleAugment(self, test, msg):
        sys.stdout.write(self.EraseToEndOfLine)
        sys.stdout.write(" %s" % msg.strip())
        sys.stdout.write("\r%s" % test.console_last_line)
        sys.stdout.flush()

    def _consoleWrite(self, test, msg, sticky):
        sys.stdout.write(chr(27) + '[2K')
        sys.stdout.write("\r%s" % msg.strip())

        if sticky:
            sys.stdout.write("\n")
            test.console_last_line = None

        sys.stdout.flush()

class Brief(OutputHandler):
    """Output handler for producing the brief output format."""
    def testStart(self, test):
        pass

    def testCommand(self, test, cmdline):
        pass

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        pass

    def testSucceeded(self, test, msg):
        pass

    def testFailed(self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.output(test, "%s ... %s" % (test.displayName(), msg))

    def testUnstable (self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.output(test, "%s ... %s" % (test.displayName(), msg))

    def testSkipped(self, test, msg):
        pass

class Verbose(OutputHandler):
    """Output handler for producing the verbose output format."""

    def testStart(self, test):
        self.output(test, self.threadPrefix(), nl=False)
        self.output(test, "%s ..." % test.displayName())

    def testCommand(self, test, cmdline):
        part = ""

        if cmdline.part > 1:
            part = " [part #%d]" % cmdline.part

        self.output(test, self.threadPrefix(), nl=False)
        self.output(test, "  > %s%s" % (cmdline.cmdline, part))

    def testProgress(self, test, msg):
        """Called when a test signals having made progress."""
        self.output(test, "  - " + msg)

    def testSucceeded(self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.showTestVerbose(test)
        self.output(test, "... %s %s" % (test.displayName(), msg))

    def testFailed(self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.showTestVerbose(test)
        self.output(test, "... %s %s" % (test.displayName(), msg))

    def testUnstable(self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.showTestVerbose(test)
        self.output(test, "... %s %s" % (test.displayName(), msg))

    def testSkipped(self, test, msg):
        self.output(test, self.threadPrefix(), nl=False)
        self.showTestVerbose(test)
        self.output(test, "... %s %s" % (test.displayName(), msg))

    def showTestVerbose(self, test):
        if not os.path.exists(test.verbose):
            return

        for line in open(test.verbose):
            self.output(test, "  > [test-verbose] %s" % line.strip())

class Diag(OutputHandler):
    def __init__(self, options, all=False, file=None):
        """Output handler for producing the diagnostic output format.

        options: An optparser with the global options.

        all: Print diagnostics also for succeeding tests.

        file: Output into given file rather than console.
        """
        OutputHandler.__init__(self, options)
        self._all = all
        self._file = file

    def showDiag(self, test):
        """Generates diagnostics for a test."""
        for line in test.diagmsgs:
            self.output(test, "  % " + line, True, self._file)

        for f in (test.diag, os.path.join(test.tmpdir, ".stderr")):
            if not f:
                continue

            if os.path.isfile(f):
                self.output(test, "  % cat " + os.path.basename(f), True, self._file)
                for line in open(f):
                    self.output(test, "  " + line.rstrip(), True, self._file)
                self.output(test, "", True, self._file)

        if self.options().wait and not self._file:
            self.output(test, "<Enter> ...")
            try:
                sys.stdin.readline()
            except KeyboardInterrupt:
                sys.exit(1)

    def testCommand(self, test, cmdline):
        pass

    def testSucceeded(self, test, msg):
        if self._all:
            if self._file:
                self.output(test, "%s ... %s" % (test.displayName(), msg), True, self._file)

            self.showDiag(test)

    def testFailed(self, test, msg):
        if self._file:
            self.output(test, "%s ... %s" % (test.displayName(), msg), True, self._file)

        if not test.known_failure:
            self.showDiag(test)

    def testUnstable(self, test, msg):
        if self._file:
            self.output(test, "%s ... %s" % (test.displayName(), msg), True, self._file)

    def testSkipped(self, test, msg):
        if self._file:
            self.output(test, "%s ... %s" % (test.displayName(), msg), True, self._file)

class SphinxOutput(OutputHandler):
    def __init__(self, options, all=False, file=None):
        """Output handler for producing output when running from
        Sphinx. The main point here is that we save all diagnostic output to
        $BTEST_RST_OUTPUT.

        options: An optparser with the global options.
        """
        OutputHandler.__init__(self, options)

        self._output = None

        try:
            self._rst_output = os.environ["BTEST_RST_OUTPUT"]
        except KeyError:
            print("warning: environment variable BTEST_RST_OUTPUT not set, will not produce output", file=sys.stderr)
            self._rst_output = None

    def testStart(self, test):
        self._output = None

    def testCommand(self, test, cmdline):
        if not self._rst_output:
            return

        self._output = "%s#%s" % (self._rst_output, cmdline.part)
        self._part = cmdline.part

    def testSucceeded(self, test, msg):
        pass

    def testFailed(self, test, msg):
        if not self._output:
            return

        out = open(self._output, "a")

        print("\n.. code-block:: none ", file=out)
        print("\n  ERROR executing test '%s' (part %s)\n" % (test.displayName(), self._part), file=out)

        for line in test.diagmsgs:
            print("  % " + line, file=out)

        test.diagmsgs = []

        for f in (test.diag, os.path.join(test.tmpdir, ".stderr")):
            if not f:
                continue

            if os.path.isfile(f):
                print("  % cat " + os.path.basename(f), file=out)
                for line in open(f):
                    print("   %s" % line.strip(), file=out)
                print(file=out)

    def testUnstable(self, test, msg):
        pass

    def testSkipped(self, test, msg):
        pass

class XMLReport(OutputHandler):

    RESULT_PASS = "pass"
    RESULT_FAIL = "failure"
    RESULT_SKIP = "skipped"
    RESULT_UNSTABLE = "unstable"

    def __init__(self, options, xmlfile):
        """Output handler for producing an XML report of test results.

        options: An optparser with the global options.

        file: Output into given file
        """
        OutputHandler.__init__(self, options)
        self._file = xmlfile
        self._start = time.time()
        self._timestamp = datetime.now().isoformat()

    def prepare(self, mgr):
        self._results = mgr.list([])

    def testStart(self, test):
        pass

    def testCommand(self, test, cmdline):
        pass

    def makeTestCaseElement(self, doc, testsuite, name, duration):
        parts = name.split('.')
        if len(parts) > 1:
            classname = ".".join(parts[:-1])
            name = parts[-1]
        else:
            classname = parts[0]
            name = parts[0]

        e = doc.createElement("testcase")
        e.setAttribute("classname", classname)
        e.setAttribute("name", name)
        e.setAttribute("time", str(duration))
        testsuite.appendChild(e)

        return e

    def getContext(self, test, context_file):
        context = ""
        for line in test.diagmsgs:
            context += "  % " + line + "\n"

        for f in (test.diag, os.path.join(test.tmpdir, context_file)):
            if not f:
                continue

            if os.path.isfile(f):
                context += "  % cat " + os.path.basename(f) + "\n"
                for line in open(f):
                    context += "  " + line.strip() + "\n"

        return context

    def addTestResult(self, test, status):
        context = ""

        if status != self.RESULT_PASS:
            context = self.getContext(test, ".stderr")

        res = {"name": test.displayName(),
               "status": status,
               "context": context,
               "duration": time.time() - test.start,
               }

        self._results.append(res)

    def testSucceeded(self, test, msg):
        self.addTestResult(test, self.RESULT_PASS)

    def testFailed(self, test, msg):
        self.addTestResult(test, self.RESULT_FAIL)

    def testUnstable(self,test,msg):
        self.addTestResult(test, self.RESULT_UNSTABLE)

    def testSkipped(self, test, msg):
        self.addTestResult(test, self.RESULT_SKIP)

    def finished(self):
        num_tests = 0
        num_failures = 0
        doc = xml.dom.minidom.Document()
        testsuite = doc.createElement("testsuite")
        doc.appendChild(testsuite)

        for res in self._results:
            test_case = self.makeTestCaseElement(doc, testsuite,
                                                 res["name"], res["duration"])

            if res["status"] != self.RESULT_PASS:
                e = doc.createElement(res["status"])
                e.setAttribute("type", res["status"])
                text_node = doc.createTextNode(res["context"])
                e.appendChild(text_node)
                test_case.appendChild(e)

                if res["status"] == self.RESULT_FAIL:
                    num_failures += 1

            num_tests += 1

        testsuite.setAttribute("time", str(time.time() - self._start))
        testsuite.setAttribute("tests", str(num_tests))
        testsuite.setAttribute("failures", str(num_failures))
        testsuite.setAttribute("errors", str(0))

        testsuite.setAttribute("timestamp", self._timestamp)
        testsuite.setAttribute("hostname", socket.gethostname())

        print(doc.toprettyxml(indent="    "), file=self._file)
        self._file.close()

### Timing measurements.

# Base class for all timers.
class TimerBase:
    # Returns true if time measurement are supported by this class on the
    # current platform. Must be overidden by derived classes.
    def available(self):
        raise NotImplementedError("Timer.available not implemented")

    # Runs a subprocess and measures its execution time. Arguments are as with
    # runSubprocess. Return value is the same with runTestCommandLine(). This
    # method must only be called if available() returns True. Must be overidden
    # by derived classes.
    def timeSubprocess(self, *args, **kwargs):
        raise NotImplementedError("Timer.timeSubprocess not implemented")

# Linux version of time measurements. Uses "perf".
class LinuxTimer(TimerBase):
    def __init__(self):
        self.perf = getOption("PerfPath", which("perf"))

    def available(self):
        if not platform() == "Linux":
            return False

        if not self.perf or not os.path.exists(self.perf):
            return False

        # Make sure it works.
        (success, rc) = runSubprocess("%s stat -o /dev/null true 2>/dev/null" % self.perf, shell=True)
        return success and rc == 0

    def timeSubprocess(self, *args, **kwargs):
        assert self.perf

        cargs = args
        ckwargs = kwargs

        targs = [self.perf, "stat", "-o", ".timing", "-x", " ", "-e", "instructions", "sh", "-c"]
        targs += [" ".join(cargs)]
        cargs = [targs]
        del ckwargs["shell"]

        (success, rc) = runSubprocess(*cargs, **ckwargs)

        utime = -1

        try:
            cwd = kwargs["cwd"] if "cwd" in kwargs else "."
            for line in open(os.path.join(cwd, ".timing")):
                if "instructions" in line and "not supported" not in line:
                    try:
                        m = line.split()
                        utime = int(m[0])
                    except ValueError:
                        pass

        except IOError:
            pass

        return (success, rc, utime)

# Walk the given directory and return all test files.
def findTests(paths, expand_globs=False):
    tests = []

    ignore_files = getOption("IgnoreFiles", "").split()
    ignore_dirs = getOption("IgnoreDirs", "").split()

    expanded = []

    for p in paths:
        p = os.path.join(TestBase, p)

        if expand_globs:
            expanded += [d for d in glob.glob(p) if os.path.isdir(d)]
        else:
            expanded.append(p)

    for path in expanded:
        rpath = os.path.relpath(path, TestBase)

        if os.path.isdir(path) and os.path.basename(path) in ignore_dirs:
            continue

        ignores = [os.path.join(path, dir) for dir in ignore_dirs]

        m = RE_PART.match(rpath)
        if m:
            error("Do not specify files with part numbers directly, use the base test name (%s)" % rpath)

        if os.path.isfile(path):
            tests += readTestFile(path)

            # See if there are more parts.
            for part in glob.glob("%s#*" % rpath):
                tests += readTestFile(part)

        elif os.path.isdir(path):
            for (dirpath, dirnames, filenames) in os.walk(path):

                ign = os.path.join(dirpath, ".btest-ignore")

                if os.path.isfile(os.path.join(ign)):
                    del dirnames[0:len(dirnames)]
                    continue

                for file in filenames:
                    for gl in ignore_files:
                        if fnmatch.fnmatch(file, gl):
                            break
                    else:
                        tests += readTestFile(os.path.join(dirpath, file))

                # Don't recurse into these.
                for (dir, path) in [(dir, os.path.join(dirpath, dir)) for dir in dirnames]:
                    for skip in ignores:
                        if path == skip:
                            dirnames.remove(dir)

        else:
            # See if we have test(s) named like this in our configured set.
            found = False
            for t in Config.configured_tests:
                if t and rpath == t.name:
                    tests += [t]
                    found = True

            if not found:
                # See if there are parts.
                for part in glob.glob("%s#*" % rpath):
                    tests += readTestFile(part)
                    found = True

                if not found:
                    error("cannot read %s" % path)

    return tests

# Merge parts belonging to the same test into one.
def mergeTestParts(tests):
    def key(t):
        return (t.basename, t.number, t.part)

    out = {}

    for t in sorted(tests, key=key):
        try:
            other = out[t.name]

            assert t.part != other.part
            out[t.name].mergePart(t)

        except KeyError:
            out[t.name] = t

    return sorted([t for t in out.values()], key=key)

# Read the given test file and instantiate one or more tests from it.
def readTestFile(filename):
    def newTest(content, previous):
        if not previous:
            t = Test(filename)
            if t.parse(content, filename):
                return t
            else:
                return None
        else:
            return previous.clone(content)

    if os.path.basename(filename) == ".btest-ignore":
        return []

    try:
        input = io.open(filename, encoding=getDefaultBtestEncoding())
    except IOError as e:
        error("cannot read test file: %s" % e)

    tests = []
    files = []

    content = []
    previous = None
    file = (None, [])

    state = "test"

    try:
        lines = [line for line in input]
    except UnicodeDecodeError as e:
        # This error is caused by either a test file with an invalid UTF-8 byte
        # sequence, or if python makes the wrong assumption about the encoding
        # of a test file (this can happen if a test file has valid UTF-8 but
        # none of the locale environment variables LANG, LC_CTYPE, or LC_ALL,
        # were defined prior to running btest).  However, if all test files
        # are ASCII, then this error should never occur.
        error("unicode decode error in file %s: %s" % (filename, e))

    for line in lines:

        if state == "test":
            m = RE_START_FILE.search(line)
            if m:
                state = "file"
                file = (m.group(1), [])
                continue

            m = RE_END_FILE.search(line)
            if m:
                error("%s: unexpected %sEND-FILE" % (filename, CommandPrefix))

            m = RE_START_NEXT_TEST.search(line)
            if not m:
                content += [line]
                continue

            t = newTest(content, previous)
            if not t:
                return []

            tests += [t]

            previous = t
            content = []

        elif state == "file":
            m = RE_END_FILE.search(line)
            if m:
                state = "test"
                files += [file]
                file = (None, [])
                continue

            file = (file[0], file[1] + [line])

        else:
            error("internal: unknown state %s" % state)

    if state == "file":
        files += [file]

    input.close()

    t = newTest(content, previous)
    if t:
        tests.append(t)

    for t in tests:
        if t:
            t.addFiles(files)

    return tests

def jOption(default):
    def func(option, opt_str, value, parser):
        if parser.rargs and not parser.rargs[0].startswith('-'):
            try:
                val = int(parser.rargs[0])
                parser.rargs.pop(0)
            except ValueError:
                val = default
        else:
            val = default

        setattr(parser.values, option.dest, val)

    return func

# Output markup language documenting tests.
def outputDocumentation(tests, format):
    def indent(i):
        return "    " * i

    def doc(t):
        return t.doc if t.doc else ["No documentation."]

    # The "sectionlist" ensures that sections are output in same order as
    # they appear in the "tests" list.
    sectionlist = []
    sections = {}

    for t in tests:
        ids = t.name.split(".")
        path = ".".join(ids[:-1])
        if path not in sectionlist:
            sectionlist.append(path)
        s = sections.setdefault(path, [])
        s.append(t)

    for s in sectionlist:
        tests = sections[s]

        if format == "rst":
            print("%s" % s)
            print("-" * len(s))
            print()

            for t in tests:
                print("%s``%s``:" % (indent(1), t.name))
                for d in doc(t):
                    print("%s%s" % (indent(2), d))
                print()

        if format == "md":
            print("# %s" % s)
            print()

            for t in tests:
                print("* `%s`:" % t.name)
                for d in doc(t):
                    print("%s%s" % (indent(1), d))

            print()

### Main

optparser = optparse.OptionParser(usage="%prog [options] <directories>", version=VERSION)
optparser.add_option("-U", "--update-baseline", action="store_const", dest="mode", const="UPDATE",
                     help="create a new baseline from the tests' output")
optparser.add_option("-u", "--update-interactive", action="store_const", dest="mode", const="UPDATE_INTERACTIVE",
                     help="interactively asks whether to update baseline for a failed test")
optparser.add_option("-d", "--diagnostics", action="store_true", dest="diag", default=False,
                     help="show diagnostic output for failed tests")
optparser.add_option("-D", "--diagnostics-all", action="store_true", dest="diagall", default=False,
                     help="show diagnostic output for ALL tests")
optparser.add_option("-f", "--file-diagnostics", action="store", type="string", dest="diagfile", default="",
                     help="write diagnostic output for failed tests into file; if file exists, it is overwritten")
optparser.add_option("-v", "--verbose", action="store_true", dest="verbose", default=False,
                     help="show commands as they are executed")
optparser.add_option("-w", "--wait", action="store_true", dest="wait", default=False,
                     help="wait for <enter> after each failed (with -d) or all (with -D) tests")
optparser.add_option("-b", "--brief", action="store_true", dest="brief", default=False,
                     help="outputs only failed tests")
optparser.add_option("-c", "--config", action="store", type="string", dest="config", default=ConfigDefault,
                     help="configuration file")
optparser.add_option("-t", "--tmp-keep", action="store_true", dest="tmps", default=False,
                     help="do not delete tmp files created for running tests")
optparser.add_option("-j", "--jobs", action="callback", callback=jOption(multiprocessing.cpu_count()), dest="threads", default=0,
                     help="number of threads to run tests in simultaneously; 0 disables threading")
optparser.add_option("-g", "--groups", action="store", type="string", dest="groups", default="",
                     help="execute only tests of given comma-separated list of groups")
optparser.add_option("-r", "--rerun", action="store_true", dest="rerun", default=False,
                     help="execute commands for tests that failed last time")
optparser.add_option("-q", "--quiet", action="store_true", dest="quiet", default=False,
                     help="suppress information output other than about failed tests")
optparser.add_option("-x", "--xml", action="store", type="string", dest="xmlfile", default="",
                     help="write a report of test results in JUnit XML format to file; if file exists, it is overwritten")
optparser.add_option("-a", "--alternative", action="store", type="string", dest="alternatives", default=None,
                     help="activate given alternative")
optparser.add_option("-S", "--sphinx", action="store_true", dest="sphinx", default=False,
                     help="indicates that we're running from inside Sphinx; for internal purposes")
optparser.add_option("-T", "--update-times", action="store_true", dest="update_times", default=False,
                     help="create a new timing baseline for tests being measured")
optparser.add_option("-R", "--documentation", action="store", type="choice", dest="doc", choices=("rst", "md"), metavar="format", default=None,
                     help="Output documentation for tests, supported formats: rst, md")
optparser.add_option("-A", "--show-all", action="store_true", default=False,
                     help="For console output, show one-liners for passing/skipped tests in addition to any failing ones")
optparser.add_option("-z", "--retries", action="store", dest="retries", type="int", default=0,
                     help="Retry failed tests this many times to determine if they are unstable")

optparser.set_defaults(mode="TEST")
(Options, args) = optparser.parse_args()

if not os.path.exists(Options.config):
    error("configuration file '%s' not found" % Options.config)

# The defaults come from environment variables, plus a few additional items.
defaults = {}
# Changes to defaults should not change os.environ
defaults.update(os.environ)
defaults["default_path"] = os.environ["PATH"]

dirname = os.path.dirname(Options.config)
if not dirname:
    dirname = os.getcwd()

# If the BTEST_TEST_BASE envirnoment var is set, we'll use that as the testbase.
# If not, we'll use the current directory.
TestBase = os.path.abspath(os.environ.get("BTEST_TEST_BASE", dirname))
defaults["testbase"] = TestBase

# Parse our config
Config = getcfgparser(defaults)
Config.read(Options.config)

if Options.alternatives:
    # Preprocess to split into list.
    Options.alternatives = [alt.strip() for alt in Options.alternatives.split(",") if alt != "-"]

    # If TestBase wasn't explicitly specified as an environment
    # variable, we check to see if we're using an alternative that sets
    # it. We'll use the TestBase from the first matching alternative that
    # does so.
    if "BTEST_TEST_BASE" not in os.environ:
        for tag in Options.alternatives:
            base = getOption("BTEST_TEST_BASE", None, section="environment-%s" % tag)
            if base is None:
                continue

            TestBase = os.path.abspath(base)
            defaults["testbase"] = TestBase

            # At this point, our defaults have changed, so we
            # reread the configuration.
            Config = getcfgparser(defaults)
            Config.read(Options.config)

            # Now we continue with normal config file processing.
            break

os.chdir(TestBase)

if Options.sphinx:
    Options.quiet = True

if Options.quiet:
    Options.brief = True

# Determine output handlers to use.

output_handlers = []

if Options.verbose:
    output_handlers += [Verbose(Options, )]

elif Options.brief:
    output_handlers += [Brief(Options, )]

else:
    if sys.stdout.isatty():
        if Options.show_all:
            output_handlers += [Console(Options, )]
        else:
            output_handlers += [CompactConsole(Options, )]
    else:
        output_handlers += [Standard(Options, )]

if Options.diagall:
    output_handlers += [Diag(Options, True, None)]

elif Options.diag:
    output_handlers += [Diag(Options, False, None)]

if Options.diagfile:
    try:
        diagfile = open(Options.diagfile, "w", 1)
        output_handlers += [Diag(Options, Options.diagall, diagfile)]

    except IOError as e:
        print("cannot open %s: %s" % (Options.diagfile, e), file=sys.stderr)

if Options.sphinx:
    output_handlers += [SphinxOutput(Options)]

if Options.xmlfile:
    try:
        xmlfile = open(Options.xmlfile, "w", 1)
        output_handlers += [XMLReport(Options, xmlfile)]

    except IOError as e:
        print("cannot open %s: %s" % (Options.xmlfile, e), file=sys.stderr)

output_handler = Forwarder(Options, output_handlers)

# Determine Timer to use.

Timer = None

if platform() == "Linux":
    t = LinuxTimer()
    if t.available():
        Timer = t

if Options.update_times and not Timer:
    warning("unable to create timing baseline because timer is not available")

# Evaluate other command line options.

if Config.has_section("environment"):
    for (name, value) in Config.itemsNoDefaults("environment"):
        # Here we don't want to include items from defaults
        os.environ[name.upper()] = value

Alternatives = {}

if Options.alternatives:
    for tag in Options.alternatives:
        a = Alternative(tag)

        try:
            for (name, value) in Config.itemsNoDefaults("filter-%s" % tag):
                a.filters[name] = value

        except configparser.NoSectionError:
            pass

        try:
            for (name, value) in Config.itemsNoDefaults("substitution-%s" % tag):
                a.substitutions[name] = value

        except configparser.NoSectionError:
            pass

        try:
            for (name, value) in Config.itemsNoDefaults("environment-%s" % tag):
                a.envs[name] = value

        except configparser.NoSectionError:
            pass

        Alternatives[tag] = a

CommandPrefix = getOption("CommandPrefix", "@TEST-")

RE_INPUT = re.compile("%INPUT")
RE_DIR = re.compile("%DIR")
RE_ENV = re.compile("\$\{(\w+)\}")
RE_PART = re.compile("^(.*)#([0-9]+)$")
RE_IGNORE = re.compile(CommandPrefix + "IGNORE")
RE_START_NEXT_TEST = re.compile(CommandPrefix + "START-NEXT")
RE_START_FILE = re.compile(CommandPrefix + "START-FILE +([^\n ]*)")
RE_END_FILE = re.compile(CommandPrefix + "END-FILE")

# Commands as tuple (tag, regexp, more-than-one-is-ok, optional, group-main, group-add)
RE_EXEC                = ("exec",            re.compile(CommandPrefix + "EXEC(-FAIL)?: *(.*)"), True, False, 2, 1)
RE_REQUIRES            = ("requires",        re.compile(CommandPrefix + "REQUIRES: *(.*)"), True, True, 1, -1)
RE_GROUP               = ("group",           re.compile(CommandPrefix + "GROUP: *(.*)"), True, True, 1, -1)
RE_SERIALIZE           = ("serialize",       re.compile(CommandPrefix + "SERIALIZE: *(.*)"), False, True, 1, -1)
RE_PORT                = ("port",            re.compile(CommandPrefix + "PORT: *(.*)"), True, True, 1, -1)
RE_INCLUDE_ALTERNATIVE = ("alternative",     re.compile(CommandPrefix + "ALTERNATIVE: *(.*)"), True, True, 1, -1)
RE_IGNORE_ALTERNATIVE  = ("not-alternative", re.compile(CommandPrefix + "NOT-ALTERNATIVE: *(.*)"), True, True, 1, -1)
RE_COPY_FILE           = ("copy-file",       re.compile(CommandPrefix + "COPY-FILE: *(.*)"), True, True, 1, -1)
RE_KNOWN_FAILURE       = ("known-failure",   re.compile(CommandPrefix + "KNOWN-FAILURE"), False, True, -1, -1)
RE_MEASURE_TIME        = ("measure-time",    re.compile(CommandPrefix + "MEASURE-TIME"), False, True, -1, -1)
RE_DOC                 = ("doc",             re.compile(CommandPrefix + "DOC: *(.*)"), True, True, 1, -1)

Commands = (RE_EXEC, RE_REQUIRES, RE_GROUP, RE_SERIALIZE, RE_PORT, RE_INCLUDE_ALTERNATIVE, RE_IGNORE_ALTERNATIVE, RE_COPY_FILE, RE_KNOWN_FAILURE, RE_MEASURE_TIME, RE_DOC)

StateFile = os.path.abspath(getOption("StateFile", os.path.join(defaults["testbase"], ".btest.failed.dat")))
TmpDir = os.path.abspath(getOption("TmpDir", os.path.join(defaults["testbase"], ".tmp")))
BaselineDir = os.path.abspath(getOption("BaselineDir", os.path.join(defaults["testbase"], "Baseline")))
Initializer = getOption("Initializer", "")
BaselineTimingDir = os.path.abspath(getOption("TimingBaselineDir", os.path.join(BaselineDir, "_Timing")))
Finalizer = getOption("Finalizer", "")
PartFinalizer = getOption("PartFinalizer", "")

Config.configured_tests = []

testdirs = getOption("TestDirs", "").split()
if testdirs:
    Config.configured_tests = findTests(testdirs, True)

if args:
    tests = findTests(args)

else:
    if Options.rerun:
        (success, tests) = readStateFile()

        if success:
            if not tests:
                output("no tests failed last time")
                sys.exit(0)

        else:
            warning("cannot read state file, executing all tests")
            tests = Config.configured_tests

    else:
        tests = Config.configured_tests

if Options.groups:
    groups = Options.groups.split(",")
    Options.groups = set([g for g in groups if not g.startswith("-")])
    Options.no_groups = set([g[1:] for g in groups if g.startswith("-")])

    def rightGroup(t):
        if not t:
            return True

        if t.groups & Options.groups:
            return True

        if "" in Options.no_groups:
            if not t.groups:
                return True

        elif Options.no_groups:
            if (t.groups & Options.no_groups):
                return False

            return True

        return False

    tests = [t for t in tests if rightGroup(t)]

if not tests:
    output("no tests to execute")
    sys.exit(0)

tests = mergeTestParts(tests)

if Options.doc:
    outputDocumentation(tests, Options.doc)
    sys.exit(0)

mkdir(BaselineDir)
mkdir(TmpDir)

try:
    # Building our own path to avoid "error: AF_UNIX path too long" on
    # some platforms. See BIT-862.
    sname = "btest-socket-%d" % os.getpid()
    addr = os.path.join(tempfile.gettempdir(), sname)

    # Check if the pathname is too long to fit in struct sockaddr_un (the
    # maximum length is system-dependent, so here we just use 100, which seems
    # a safe default choice).
    if len(addr) > 100:
        # Try relative path to TmpDir (which would usually be ".tmp").
        addr = os.path.join(os.path.relpath(TmpDir), sname)

        # If the path is still too long, then use the global tmp directory.
        if len(addr) > 100:
            addr = os.path.join("/tmp", sname)

    (succeeded, failed, skipped, unstable) = TestManager(address=addr).run(copy.deepcopy(tests), output_handler)
    total = succeeded + failed + skipped
    output_handler.finished()

except (KeyboardInterrupt, IOError): # Ctrl-C can lead to broken pipe.
    output_handler.finished()
    print("Aborted.", file=sys.stderr)
    os._exit(1)

skip = (", %d skipped" % skipped) if skipped > 0 else ""
unstablestr = (", %d unstable" % unstable) if unstable > 0 else ""

if failed > 0:
    if not Options.quiet:
        output("%d of %d test%s failed%s%s" % (failed, total, "s" if total > 1 else "", skip, unstablestr))

    sys.exit(1)

elif skipped > 0 or unstable > 0:
    if not Options.quiet:
        output("%d test%s successful%s%s" % (succeeded, "s" if succeeded != 1 else "", skip, unstablestr))

    sys.exit(0)

else:
    if not Options.quiet:
        output("all %d tests successful" % total)

    sys.exit(0)
